1. Callable

泛型接口,用于获取线程执行完的结果,Callable的声明如下

  1. public interface Callable<V> {
  2. // 返回 V 类型的结果
  3. V call() throws Exception;
  4. }

Callable 接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常,而Callable是可返回结果并且可以抛出异常的任务

  1. public class MyCallable implements Callable{
  2. private int number;
  3. public MyCallable(int number) {
  4. this.number = number;
  5. }
  6. @Override
  7. public Integer call() throws Exception {
  8. int sum = 0;
  9. for (int x = 1; x <= number; x++) {
  10. sum += x;
  11. }
  12. return sum;
  13. }
  14. }
  15. public class CallableDemo {
  16. ExecutorService pool = Executors.newFixedThreadPool(2);
  17. Future<Integer> future = pool.submit(new SumCallable(10));
  18. Integer sum = future.get();
  19. pool.shutdown();
  20. class SumCallable implements Callable<Integer> {
  21. private int number;
  22. public SumCallable(int number){
  23. this.number = number;
  24. }
  25. @Override
  26. public Integer call() throws Exception {
  27. int sum = 0;
  28. for (int i=0; i<number; i++){
  29. sum += i;
  30. }
  31. return sum;
  32. }
  33. }
  34. }

2. Future

Future 为线程池提供了一个可管理的任务标准,它提供了对Runnable或Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作

Future 接口表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用get()方法来获取结果,如有必要,计算完成前可以阻塞此方法

Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。Callable要采用ExecutorService的submit()方法提交,返回的future对象可以取消任务。Future声明如下

  1. public interface Future<V> {
  2. boolean cancel(boolean var1);
  3. // 该任务是否已经取消
  4. boolean isCancelled();
  5. // 判断是否已经完成
  6. boolean isDone();
  7. // 获取结果,该方法会阻塞
  8. V get() throws InterruptedException, ExecutionException;
  9. // 获取结果,如果还未完成那么等待,直到timeout或返回结果,该方法会阻塞
  10. V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
  11. }

Future的简单使用

  1. // 创建线程池对象
  2. ExecutorService pool = Executors.newFixedThreadPool(2);
  3. // 可以执行Runnable对象或者Callable对象代表的线程
  4. Future<Integer> future = pool.submit(new MyCallable(100));
  5. Integer i1 = future.get();
  6. pool.shutdown();

Future常用方法

方法声明 功能描述
cancel() 取消任务
isCanceled() 任务是否取消
isDone() 任务是否完成
get() 获取结果,get()方法会阻塞,直到任务返回结果
set() 设置结果

3. FutureTask

Future只是定义了一些接口规范,而FutureTask则是它的实现类

  1. public class FutureTask<V> implements RunnableFuture<V>{
  2. // 代码省略
  3. }
  4. public interface RunnableFuture<V> extends Runnable, Future<V>{
  5. void run();
  6. }

FutureTask会像Thread包装Runnable那样对Runnable和Callable进行包装,Runnable和Callable由构造函数注入

  1. public FutureTask(Callable<V> callable) {
  2. if (callable == null)
  3. throw new NullPointerException();
  4. this.callable = callable;
  5. this.state = NEW; // ensure visibility of callable
  6. }
  1. public FutureTask(Runnable runnable, V result) {
  2. this.callable = Executors.callable(runnable, result);
  3. this.state = NEW; // ensure visibility of callable
  4. }

由于FutureTask实现了Runnable,因此,它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行,并且还可以直接通过get()方法获取执行结果,该方法会阻塞,直到结果返回。因此,FutureTask既是Future、Runnable,又是包装了Callable,它是两者的合体

  1. public class FutureDemo {
  2. // 线程池
  3. static ExecutorService mExecutor = Executors.newSingleThreadExecutor();
  4. // main函数
  5. public static void main(String[] args) {
  6. try {
  7. futureWithRunnable();
  8. futureWithCallable();
  9. futureTask();
  10. } catch (Exception e) {
  11. }
  12. }
  13. /**
  14. * 其中Runnable实现的是void run()方法,无返回值;Callable实现的是 V
  15. * call()方法,并且可以返回执行结果。其中Runnable可以提交给Thread来包装下
  16. * ,直接启动一个线程来执行,而Callable则一般都是提交给ExecuteService来执行。
  17. */
  18. private static void futureWithRunnable() throws InterruptedException, ExecutionException {
  19. // 提交runnable则没有返回值, future没有数据
  20. Future<?> result = mExecutor.submit(new Runnable() {
  21. @Override
  22. public void run() {
  23. fibc(20);
  24. }
  25. });
  26. System.out.println("future result from runnable : " + result.get());
  27. }
  28. private static void futureWithCallable() throws InterruptedException, ExecutionException {
  29. /**
  30. * 提交Callable, 有返回值, future中能够获取返回值
  31. */
  32. Future<Integer> result2 = mExecutor.submit(new Callable<Integer>() {
  33. @Override
  34. public Integer call() throws Exception {
  35. return fibc(20);
  36. }
  37. });
  38. System.out.println("future result from callable : "
  39. + result2.get());
  40. }
  41. private static void futureTask() throws InterruptedException, ExecutionException {
  42. /**
  43. * FutureTask则是一个RunnableFuture<V>,即实现了Runnbale又实现了Futrue<V>这两个接口,
  44. * 另外它还可以包装Runnable(实际上会转换为Callable)和Callable
  45. * <V>,所以一般来讲是一个符合体了,它可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行
  46. * ,并且还可以通过v get()返回执行结果,在线程体没有执行完成的时候,主线程一直阻塞等待,执行完则直接返回结果。
  47. */
  48. FutureTask<Integer> futureTask = new FutureTask<Integer>(
  49. new Callable<Integer>() {
  50. @Override
  51. public Integer call() throws Exception {
  52. return fibc(20);
  53. }
  54. });
  55. // 提交futureTask
  56. mExecutor.submit(futureTask);
  57. System.out.println("future result from futureTask : "
  58. + futureTask.get());
  59. }
  60. // 效率底下的斐波那契数列, 耗时的操作
  61. private static int fibc(int num) {
  62. if (num == 0) {
  63. return 0;
  64. }
  65. if (num == 1) {
  66. return 1;
  67. }
  68. return fibc(num - 1) + fibc(num - 2);
  69. }
  70. }

输出结果

  1. future result from runnable : null
  2. future result from Callable : 6765
  3. future result from futureTask : 6765

4. CompletionService

CompletionService用于提交一组Callable任务,其take()方法返回已完成的一个Callable任务对应的Future对象。

示例:这里数据的获取好比同时种了好几块地的麦子,然后等待收割,秋收时,哪块先熟,先收割哪块

  1. import java.util.Random;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.CompletionService;
  4. import java.util.concurrent.ExecutionException;
  5. import java.util.concurrent.ExecutorCompletionService;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.Future;
  9. import java.util.concurrent.TimeUnit;
  10. import java.util.concurrent.TimeoutException;
  11. public class CallableAndFuture {
  12. public static void main(String[] args){
  13.   //创建一个单独的线程
  14.   ExecutorService threadPool = Executors.newSingleThreadExecutor();
  15.   //future泛型与Callable的类型一致
  16.   Future<String> future = threadPool.submit(new Callable<String>(){
  17. @Override
  18. public String call() throws Exception {
  19. Thread.sleep(3000);
  20. return "hello";
  21. }
  22. });
  23. System.out.println("等待結果……");
  24. //在指定时timeout内等待,未等到抛出TimeoutException
  25. //System.out.println("拿到结果:" + future.get(long timeout,TimeUnit unit));
  26. try {
  27. System.out.println("拿到结果:" + future.get()); //获取线程执行后的结果
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. } catch (ExecutionException e) {
  31. e.printStackTrace();
  32. }
  33. //CompletionService用于提交一组Callable任务,
  34. //其take方法返回已完成的一个Callable任务对应的Future对象。
  35. ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
  36. CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2);
  37.    //创建10任务
  38. for(int i=1;i<=10;i++){
  39. final int seq = i;
  40.    //将任务提交给线程池
  41. completionService.submit(new Callable<Integer>(){
  42. @Override
  43. public Integer call() throws Exception {
  44. Thread.sleep(new Random().nextInt(5000));
  45. return seq;
  46. }
  47. });
  48. }
  49. //获取结果,哪个结果先返回就先获得
  50. for(int i=0;i<10;i++){
  51. try {
  52. System.out.println(completionService.take().get());
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. } catch (ExecutionException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  60. }

输出结果

  1. 等待結果……
  2. 拿到结果:hello
  3. 3
  4. 6
  5. 10
  6. 2
  7. 4
  8. 8
  9. 5
  10. 1
  11. 7
  12. 9